Dans ce projet on cherche via un dataSet kaggle à créer un système de recommendation prenant une musique en input et proposant un certain nombre de titre similaire.
On se base sur un algorithme des K Nearest Neighbors et le projet est en scala.
On pourra retrouver ce notebook sur github
Étapes:
Préprocessing des données
Dans cette partie on supprime des abbérations du à des titres de musique trop longue qui amènenent le décalage des informations de notre DataFrame. On remarque aussi qu'il y a des duplicats de musique qu'il faudra traiter plus tard. Ces duplicats sont du à la possibilité pour une musique d'appartenir à plusieur genre différent. Ces musiques auront alors plusieurs lignes associées et même si leurs ID reste les mêmes elles peuvent avoir des mesures différentes.
Etude des différentes métriques
Cette partie permet d'expliquer le sens de chaque métrique disponible dans le dataset. Pour chaque métrique un graphique associé permet de se rendre compte de la répartition des données dans le dataset.
Les différentes métriques sont: genre, artist_name, track_name, track_id, popularity, acousticness, danceability, duration_ms, energy, instrumentalness, key, liveness, loudness, mode, speechiness, tempo, time_signature, valence.
Typage des données
Typage du dataset en fonction de leur valeurs.
Etude de la correlation de nos features
Etude de la matrice de correlation de nos features pour s'assurer que nos features ne sont pas trop corrélées.
Selection de nos features
On décide de ne pas se servir de duration_ms et popularity dans la suite de notre modèle car ces valeurs ne semblent pas pertinente pour un système de recommendation.
Preparation de nos données
Dans cette partie on va modifier les champs de nos données pour pouvoir les utiliser dans notre modèle.
Encodage de nos variable de texte avec StringIndexer
On modifie quatre de nos features contenant du texte, genre, key, mode et time_signature afin d'avoir un input numérique.
Utilisation d'un One Hot Encoder
Sur les mêmes features que le StringIndexer pour éviter à notre modèle de penser à une hiéarchie entre nos variables nous allons les passer dans un OneHotEncoder qui les transformera en vecteur de 0 et de 1.
Aggregation de nos données
Il faut régler le problème de la duplication des musiques dans notre dataset. De plus le genre de chaque musique nous intéresse il faut donc pouvoir supprimer les duplicats tout en gardant ces informations. Pour cela on aggrège chaque ligne ensemble en calculant la moyenne de chaque valeur numérique pour les mesures étant des entiers. Pour nos vecteurs créé avec le OneHotEncoder, on utilise la fonction max pour être sûr de ne pas perdre l'information des différents genres.
Vector Assembler
Afin d'avoir une seule colonne regroupant toutes nos features comme input de notre modèle.
Normalizer
Afin que chaque feature ait le même poids on normalize nos données.
Algorithme des plus proches voisins
On utilise la librairie de spark MLlib plus spécifiquement la classe BuckereRandomProjectionLSH qui est une approximation de la méthode des plus proches voisins dans le cas du traitement de grosse donnée.
Utilisation du model une fois entrainé
Définition d'une fonction permettant de réutiliser le modèle entrainé en prenant un track_id en input.
Défaut de l'étude:
Il n'y a pas de moyen de vérifier la précision du modèle, chacun doit essayer et écouter les titres proposés pour se rendre compte si le modèle marche ou non. Personnellement je trouve les résultats pertinents.
Amélioration:
Création d'une fonction pour la partie d'aggregation des musiques possédant de multiple lignes afin de pouvoir compacter la préparation des données en une seule pipeline.
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
%AddDeps org.vegas-viz vegas_2.11 0.3.10 --transitive
%AddDeps org.vegas-viz vegas-spark_2.11 0.3.10 --transitive
implicit val render = vegas.render.ShowHTML(kernel.display.content("text/html", _))
import vegas._
import vegas.render.WindowRenderer._
import vegas.sparkExt._
{val df: DataFrame = spark
.read
.option("header", true) // utilise la première ligne du (des) fichier(s) comme header
.option("inferSchema", "true") // pour inférer le type de chaque colonne (Int, String, etc.)
.csv("./data/SpotifyFeatures.csv")}
// df.printSchema()
println(s"Nombre de lignes : ${df.count}")
println(s"Nombre de colonnes : ${df.columns.length}")
%%dataFrame
df
%%dataframe --limit 5
df.filter(! col("key").isInCollection(Array("C", "G", "D", "C#", "A", "F", "B", "E", "A#", "F#", "G#", "D#")))
val data : DataFrame = df
.filter($"key".isin("C", "G", "D", "C#", "A", "F", "B", "E", "A#", "F#", "G#", "D#"))
%%dataframe --limit 5
data.groupBy("track_id", "artist_name", "track_name").count()
%%dataframe
data.filter($"track_name" === "EX - Remix")
On peut voir que certaines chansons apparaisent plusieurs fois dans nos datas elles ont des métriques quelque peu différentes mais l'information la plus importante est le genre du morceaux que l'on voudra regrouper pour chaque chanson. On s'en chargera par la suite.
Les descriptions des variables se basent sur la documentation de Spotify
The estimated overall key of the track. Integers map to pitches using standard Pitch Class notation . E.g. 0 = C, 1 = C♯/D♭, 2 = D, and so on. If no key was detected, the value is -1.
%%dataframe
data.groupBy("key").count().sort($"count".desc)
Vegas("Key")
.withDataFrame(data.groupBy("key").count())
.encodeX("key", Nom, scale=Scale(bandSize=50))
.encodeY("count", Quant)
.encodeColor(field="count", Quant, scale=Scale(rangeNominals=List("#EA98D2", "#659CCA")))
.mark(Bar)
.show
La plupart des clés ont leurs valeurs basées sur les lettres, il faudra supprimer les exceptions.
Mode indicates the modality (major or minor) of a track, the type of scale from which its melodic content is derived. Major is represented by 1 and minor is 0.
%%dataframe
data.groupBy("mode").count().sort($"count".desc)
Les valeurs principales sont "Major" et "Minor"
Vegas("Mode")
.withDataFrame(data.groupBy("mode").count())
.encodeX("mode", Nominal, scale=Scale(bandSize=50))
.encodeY("count", Quant)
.encodeColor(field="count", Quant, scale=Scale(rangeNominals=List("#EA98D2", "#659CCA")))
.mark(Bar)
.show
An estimated overall time signature of a track. The time signature (meter) is a notational convention to specify how many beats are in each bar (or measure).
%%dataframe
data.groupBy("time_signature").count().sort($"count".desc)
Vegas("Time Signature")
.withDataFrame(data.groupBy("time_signature").count())
.encodeX("time_signature", Nominal, scale=Scale(bandSize=50))
.encodeY("count", Quant)
.encodeColor(field="count", Quant, scale=Scale(rangeNominals=List("#EA98D2", "#659CCA")))
.mark(Bar)
.show
On voit que la majorité des valeur ont une valeur de time_signature de 4/4 ont peut donc supposer que cette variable ne sera pas très explicative.
A confidence measure from 0.0 to 1.0 of whether the track is acoustic. 1.0 represents high confidence the track is acoustic. The distribution of values for this feature look like this: Acousticness distribution
Vegas("Accousticness")
.withDataFrame(data.groupBy("acousticness").count())
.encodeX("acousticness", Quant)
.encodeY("count", Quant)
.encodeColor(field="count", Quant, scale=Scale(rangeNominals=List("#EA98D2", "#659CCA")))
.mark(Bar)
.show
Danceability describes how suitable a track is for dancing based on a combination of musical elements including tempo, rhythm stability, beat strength, and overall regularity. A value of 0.0 is least danceable and 1.0 is most danceable.
Vegas("Danceability")
.withDataFrame(data.groupBy("danceability").count())
.encodeX("danceability", Quant)
.encodeY("count", Quant)
.encodeColor(field="count", Quant, scale=Scale(rangeNominals=List("#EA98D2", "#659CCA")))
.mark(Bar)
.show
Energy is a measure from 0.0 to 1.0 and represents a perceptual measure of intensity and activity. Typically, energetic tracks feel fast, loud, and noisy. For example, death metal has high energy, while a Bach prelude scores low on the scale. Perceptual features contributing to this attribute include dynamic range, perceived loudness, timbre, onset rate, and general entropy.
Vegas("Energy")
.withDataFrame(data.groupBy("energy").count())
.encodeX("energy", Quant)
.encodeY("count", Quant)
.encodeColor(field="count", Quant, scale=Scale(rangeNominals=List("#EA98D2", "#659CCA")))
.mark(Bar)
.show
Predicts whether a track contains no vocals. “Ooh” and “aah” sounds are treated as instrumental in this context. Rap or spoken word tracks are clearly “vocal”. The closer the instrumentalness value is to 1.0, the greater likelihood the track contains no vocal content. Values above 0.5 are intended to represent instrumental tracks, but confidence is higher as the value approaches 1.0.
Vegas("Instrumentalness")
.withDataFrame(data.groupBy("instrumentalness").count())
.encodeX("instrumentalness", Quant)
.encodeY("count", Quant)
.encodeColor(field="count", Quant, scale=Scale(rangeNominals=List("#EA98D2", "#659CCA")))
.encodeSize(value=11L)
.mark(Bar)
.show
La grande majorité des musique sont donc avec voix
Detects the presence of an audience in the recording. Higher liveness values represent an increased probability that the track was performed live. A value above 0.8 provides strong likelihood that the track is live.
Vegas("Liveness")
.withDataFrame(data.groupBy("liveness").count())
.encodeX("liveness", Quant)
.encodeY("count", Quant)
.encodeColor(field="count", Quant, scale=Scale(rangeNominals=List("#EA98D2", "#659CCA")))
.encodeSize(value=11L)
.mark(Bar)
.show
The overall loudness of a track in decibels (dB). Loudness values are averaged across the entire track and are useful for comparing relative loudness of tracks. Loudness is the quality of a sound that is the primary psychological correlate of physical strength (amplitude). Values typical range between -60 and 0 db.
Vegas("Loudness")
.withDataFrame(data.groupBy("loudness").count())
.encodeX("loudness", Quant)
.encodeY("count", Quant)
.encodeColor(field="count", Quant, scale=Scale(rangeNominals=List("#EA98D2", "#659CCA")))
.encodeSize(value=11L)
.mark(Bar)
.show
Il peut exister des valeurs de loudness supérieur à 0 mais cela ne ressemble pas une erreur dans les données.
Speechiness detects the presence of spoken words in a track. The more exclusively speech-like the recording (e.g. talk show, audio book, poetry), the closer to 1.0 the attribute value. Values above 0.66 describe tracks that are probably made entirely of spoken words. Values between 0.33 and 0.66 describe tracks that may contain both music and speech, either in sections or layered, including such cases as rap music. Values below 0.33 most likely represent music and other non-speech-like tracks.
Vegas("Speechiness")
.withDataFrame(data.groupBy("speechiness").count())
.encodeX("speechiness", Quant)
.encodeY("count", Quant)
.encodeColor(field="count", Quant, scale=Scale(rangeNominals=List("#EA98D2", "#659CCA")))
.encodeSize(value=11L)
.mark(Bar)
.show
A measure from 0.0 to 1.0 describing the musical positiveness conveyed by a track. Tracks with high valence sound more positive (e.g. happy, cheerful, euphoric), while tracks with low valence sound more negative (e.g. sad, depressed, angry).
Vegas("Valence")
.withDataFrame(data.groupBy("valence").count())
.encodeX("valence", Quant)
.encodeY("count", Quant)
.encodeColor(field="count", Quant, scale=Scale(rangeNominals=List("#EA98D2", "#659CCA")))
.encodeSize(value=11L)
.mark(Bar)
.show
The overall estimated tempo of a track in beats per minute (BPM). In musical terminology, tempo is the speed or pace of a given piece and derives directly from the average beat duration.
Vegas("Tempo")
.withDataFrame(data.groupBy("tempo").count())
.encodeX("tempo", Quant)
.encodeY("count", Quant)
.encodeColor(field="count", Quant, scale=Scale(rangeNominals=List("#EA98D2", "#659CCA")))
.encodeSize(value=11L)
.mark(Bar)
.show
Pour choisir le type de chaque variable on se base sur la documentation de Spotify
val dfCasted: DataFrame = data
.withColumn("duration_ms", $"duration_ms".cast("Int"))
.withColumn("acousticness", $"acousticness".cast("Float"))
.withColumn("danceability", $"danceability".cast("Float"))
.withColumn("energy", $"energy".cast("Float"))
.withColumn("instrumentalness", $"instrumentalness".cast("Float"))
.withColumn("liveness", $"liveness".cast("Float"))
.withColumn("loudness", $"loudness".cast("Float"))
.withColumn("speechiness", $"speechiness".cast("Float"))
.withColumn("valence", $"valence".cast("Float"))
.withColumn("tempo", $"tempo".cast("Float"))
import org.apache.spark.ml.feature.VectorAssembler
val colCorr= Array("acousticness", "danceability", "duration_ms", "energy", "instrumentalness", "liveness", "loudness", "speechiness", "tempo", "valence")
val assembler = new VectorAssembler()
.setInputCols(colCorr)
.setOutputCol("features")
import org.apache.spark.ml.linalg.Matrix
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.Row
val Row(coeff1: Matrix) = Correlation.corr(assembler.transform(dfCasted), "features").head
val colNamePairs = colCorr.flatMap(name_from => colCorr.map(name_to => (name_from, name_to)))
val triplesList = colNamePairs.zip(coeff1.toArray)
.filterNot{case((name_from, name_to), corr) => name_from >= name_to}
.map{case((name_from, name_to), corr) => (name_from, name_to, corr)}
val corrDf = sc.parallelize(triplesList).toDF("name_from", "name_to", "corr")
corrDf.sort($"corr".desc).show(5)
corrDf.sort($"corr").show(5)
dfCasted.printSchema
Comme aucune de nos variables n'est trop corrélée ( < 0.95) nous pouvons créer un modèle avec toutes les variables précédentes.
Néanmoins l'utilisation de duration_ms et popularity ne semble pas utile pour un système de recommendation basé sur la similitude d'une musique.
Pour pouvoir utiliser toutes les informations à notre disposition il nous faut modifier nos données pour quelle puisse être utilisé par un algorithme de KNN.
Notamment les variables genre, mode, key, time_signature qui sont sous format string.
Le StringIndexer encode nos labels en indice.
On a nos quatres features genre, key, mode et time_signature à modifier.
import org.apache.spark.ml.feature.StringIndexer
val genreIndexer = new StringIndexer()
.setInputCol("genre")
.setOutputCol("genreIndex")
val modeIndexer = new StringIndexer()
.setInputCol("mode")
.setOutputCol("modeIndex")
val keyIndexer = new StringIndexer()
.setInputCol("key")
.setOutputCol("keyIndex")
val tsIndexer = new StringIndexer()
.setInputCol("time_signature")
.setOutputCol("tsIndex")
Pour éviter à notre modèle de penser à une hiéarchie entre nos variables nous allons les passer dans un OneHotEncoder qui les transformera en vecteur de 0 et de 1.
import org.apache.spark.ml.feature.OneHotEncoder
val genreEncoder = new OneHotEncoder()
.setInputCol("genreIndex")
.setOutputCol("genreVec")
val modeEncoder = new OneHotEncoder()
.setInputCol("modeIndex")
.setOutputCol("modeVec")
val keyEncoder = new OneHotEncoder()
.setInputCol("keyIndex")
.setOutputCol("keyVec")
val tsEncoder = new OneHotEncoder()
.setInputCol("tsIndex")
.setOutputCol("tsVec")
import org.apache.spark.ml.{Pipeline, PipelineModel}
val pipeline = new Pipeline()
.setStages(Array(genreIndexer, modeIndexer, keyIndexer, tsIndexer,
genreEncoder, modeEncoder, keyEncoder, tsEncoder))
val dfEncode = pipeline.fit(dfCasted).transform(dfCasted)
dfEncode.select("genre","genreIndex", "genreVec").show(1)
Il reste le problème des doublons à régler.
Pour nos valeurs obtenus grâce au One Hot Encoder on utilise la classe Summarizer et la méthode max comme nous somme en présence de vecteur composé de 1 et 0 on obtiendra les différents genre et les variable mode, key, time_signature ne seront pas modifié. (Elles ne variaient pas en fonction du genre.)
Pour les autres métriques ont fait le choix de prendre la moyenne des valeurs sur les différentes itérations comme celle-ci pouvait varier faiblement.
import org.apache.spark.ml.stat.Summarizer
val dfDistinct = dfEncode
.groupBy("track_id", "artist_name", "track_name")
.agg(mean("acousticness").alias("acousticness"),
mean("danceability").alias("danceability"),
mean("energy").alias("energy"),
mean("instrumentalness").alias("instrumentalness"),
mean("liveness").alias("liveness"),
mean("loudness").alias("loudness"),
mean("speechiness").alias("speechiness"),
mean("tempo").alias("tempo"),
mean("valence").alias("valence"),
Summarizer.max($"modeVec").alias("modeVec"),
Summarizer.max($"keyVec").alias("keyVec"),
Summarizer.max($"tsVec").alias("tsVec"),
Summarizer.max($"genreVec").alias("genreVec"))
Afin d'avoir une seule colonne regroupant toutes nos features on utilise un VectorAssembler.
import org.apache.spark.ml.feature.VectorAssembler
val assembler = new VectorAssembler()
.setInputCols(Array("acousticness", "danceability", "energy", "instrumentalness", "liveness", "loudness", "speechiness", "tempo", "valence", "modeVec", "keyVec", "tsVec", "genreVec"))
.setOutputCol("features")
Comme nous effectuons un K Nearest Neighbors algorithme nous avons besoin de normaliser nos données sinon il n'y aurait pas de sens dans les distances que nous calculons.
import org.apache.spark.ml.feature.Normalizer
val normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
val assemblerPipeline = new Pipeline()
.setStages(Array(assembler, normalizer))
val dfClean = assemblerPipeline.fit(dfDistinct).transform(dfDistinct)
J'utilise un LSH algorithms plus précisément le BuckereRandomProjectionLSH pour calculer mes buckets de nearest neighbors
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
val brp = new BucketedRandomProjectionLSH()
.setBucketLength(5.0)
.setNumHashTables(3)
.setInputCol("features")
.setOutputCol("hashes")
Notre pipeline est terminé on va maintenant séparer nos données pour entrainer puis challenger notre model.
Il nous suffit de prendre les artistes et musique qu l'on veut tester.
Je fait le choix personnel de prendre l'artiste Tyler the Creator.
On fait aussi le choix que nos recommendations ne renvoient pas des musiques du même artiste.
val tylerData = dfClean.filter($"artist_name" === "Tyler, The Creator")
val training = dfClean.filter($"artist_name" =!= "Tyler, The Creator")
val model = brp.fit(training)
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
val key = tylerData.filter($"track_name" === "See You Again").select("features").rdd.map { case Row(v: Vector) => v}.first
val resultTyler = model.approxNearestNeighbors(training, key, 4)
%%dataframe
resultTyler.select("artist_name", "track_name")
%%dataframe
tylerData.filter($"track_name" === "See You Again").select("track_id","artist_name", "track_name", "acousticness", "danceability", "energy", "instrumentalness", "liveness", "loudness", "valence")
resultTyler
.select("track_id", "artist_name", "track_name", "acousticness", "danceability", "energy", "instrumentalness", "liveness", "loudness", "valence").show(false)
df.filter($"track_name" === "See You Again" && $"artist_name" === "Tyler, The Creator").select("genre").show()
df.filter($"track_id" === "0OT0cCKbSmSMRvyWeqEFBq" || $"track_id" === "4kdfjhj9xNkYU0R8xlDy8k" || $"track_id" === "1PNfhBdmFikFn4vkrwiq05" || $"track_id" === "1wIQtB3UQ1TfjNMZZqO6eh")
.select("genre", "track_name", "artist_name").show()
L'algorithme semble avoir fonctionné étant donné les musiques proposées par notre modèle.
Je n'ai pas de métrique me permettant de juger de la qualité de mon analyse. Je propose donc une fonction permettant à chaque utilisateur du notebook de facilement rechercher des recommendations pour un titre de chanson.
import org.apache.spark.sql.Dataset
def recommend(track_id: String, brp: BucketedRandomProjectionLSH, dfClean: DataFrame): Dataset[_] = {
var training = dfClean.filter($"track_id" =!= track_id)
var test = dfClean.filter($"track_id" === track_id)
var model = brp.fit(training)
var key = test.filter($"track_id" === track_id).select("features").rdd.map { case Row(v: Vector) => v}.first
var result = model.approxNearestNeighbors(training, key, 4)
return result
}
val result = recommend("7KA4W4McWYRpgf0fWsJZWB", brp, dfClean)
%%dataframe
result.select("artist_name", "track_name")